Building a Crypto Backtester with the Coinbase API — Part 3: Market data
This is the follow-up article from the second part. If you haven’t already read that article, please do and come back.
Making the REST client
First, we need a way to contact coinbase through the API. We’ll be doing that with their REST client. REST (Representational State Transfer) is a software architecture defining how information over the internet should be transferred. So let’s begin coding!
After that create a file named rest_client.py then move on to typing the code:
Loading in the libraries
from coinbase.rest import RESTClient
import os
from dotenv import load_dotenv
We need coinbase’s RESTClient to connect to the API and os to load in our environment variables (The API key and secret).
We can load in an environment variable using load_dotenv('.env') . Then we can access those variables by os.getenv('variable name') . Finally, the coinbase client only requires the api key and the secret as its arguments. Combining all this into a single class, we get the following:
class RESTClientWrapper:
"""
A wrapper class for the Coinbase RESTClient to handle API interactions using environment variables for credentials.
"""
def __init__(self):
"""
Initializes the RESTClientWrapper instance by setting up the RESTClient with API credentials from environment variables.
"""
# Loading in the environment
load_dotenv(".env")
# Retrieve API key and secret from environment variables
api_key_name = os.getenv("COINBASE_API_NAME")
api_key_secret = os.getenv("COINBASE_API_SECRET")
# Initialize the RESTClient with the retrieved API credentials
self.client = RESTClient(api_key=api_key_name, api_secret=api_key_secret)
Fetching the market: Client poller
Next, we’ll create a client poller to fetch the market data. We’ll start by creating a new file named rest_client_poller.py in the data_collection folder. This file will contain our main market-fetching function. Here’s our plan:
- Load the required libraries: We need various libraries for visualization, trading strategies, and utilities.
- Create an initialization function: This function will set up our client, a DataFrame to store market data, and the visualizer.
- Fetch market data: A function to retrieve market data for a given symbol.
- Extract trade details: A function to extract key trade details from the market data.
- Extract market information: A function to get best bid and ask prices from the market data.
- Update the DataFrame: A function to update our DataFrame with new data.
- Auto-update decorator: A decorator to automatically update the DataFrame after fetching new data.
- Main query function: A function to fetch and process market data at intervals.
Loading in the libraries we need:
from clients.rest_client import RESTClientWrapper
from visualization.visualizer import Visualizer
from trading_strategies.strategy import Strategy
from utility.utils import format_time_duration, ensure_directory_exists
import time
import pandas as pd
The init function
def __init__(self, strategy: Strategy, interval: int):
"""
Initializes the RESTClientPoller with a specific trading strategy.
Args:
strategy (Strategy): The trading strategy to be used with the poller.
"""
self.wrapper = RESTClientWrapper() # REST client wrapper instance
self.data = pd.DataFrame() # DataFrame to store market data
self.visualizer = Visualizer(
signals=strategy.signals_to_graph
) # Visualizer for data visualization
self.strategy = strategy # Trading strategy instance
self.interval = interval
This function sets up our poller with a trading strategy, initializes the RESTClientWrapper, creates an empty DataFrame to store market data. I’m using a pandas DataFrame since for most indicators we need previous data points and we can simply append each data point to the data frame till the size we need. Pandas data frames are also really easy to work with.
Fetch Market function
Next, we’ll fetch market data for a given symbol. The symbol for bitcoin traded with USD is BTC-USDC
def fetch_market_data(self, symbol: str) -> dict:
"""
Retrieves market data for a given symbol.
Args:
symbol (str): The market symbol to fetch data for.
Returns:
dict: The market data retrieved from the API.
"""
return self.wrapper.client.get_market_trades(product_id=symbol, limit=1)
Extracting the information from the response:
The response from the coinbase API is a dictionary with various keys and values. We’ll simply extract the values we need from the response using the extract_trade_details and the extract_market_info functions:
def extract_trade_details(self, result: dict) -> dict:
"""
Extracts trade details from the market data.
Args:
result (dict): The market data from which to extract trade details.
Returns:
dict: A dictionary containing key trade details.
"""
trade = result["trades"][0]
return {
"product_id": trade["product_id"],
"price": float(trade["price"]),
"size": float(trade["size"]),
"time": pd.to_datetime(trade["time"]),
"side": trade["side"],
}
def extract_market_info(self, result: dict) -> dict:
"""
Extracts market conditions from the data.
Args:
result (dict): The market data from which to extract market conditions.
Returns:
dict: A dictionary containing key market conditions.
"""
return {
"best_bid": float(result["best_bid"]),
"best_ask": float(result["best_ask"]),
}
Function to run our strategy:
def run_strategy(self):
"""
Processes the latest market data using the assigned trading strategy.
"""
self.strategy.process_data(self.data, self.quantity)
We’ll send the data over to the strategy’s process_data function where we can handle them.
Function to update dataframe:
After extracting the data from the response, we need to append this to the dataframe. We’ll also give this data to the visualize to graph it and to the strategy to run the indicators.
def update_data_frame(self, row: dict, persistence: int):
"""
Updates and trims the DataFrame to maintain a specified number of data points.
Args:
row (dict): The new data row to add to the DataFrame.
persistence (int): The number of data points to persist in the DataFrame.
"""
new_row = pd.DataFrame([row])
if len(self.data) >= persistence:
self.data = self.data.iloc[1:].reset_index(drop=True)
self.data = pd.concat([self.data, new_row], ignore_index=True)
self.visualizer.set_data(self.data)
self.run_strategy()
Here, persistency is how many data points do we want to store in the dataframe. Since ther is no limit to the dataframe, we don’t want to clog up memory. We can store the past 50 or 60 or how many ever we need with the persistence argument.
Updating the data as a decorator:
For fun, let’s make a wrapper function to update the data every time we run the main query function written after this
def auto_update_data(func):
"""
A decorator to automatically update the DataFrame after fetching new data.
Args:
func (function): The function to wrap.
Returns:
function: The wrapper function.
"""
def wrapper(self, *args, **kwargs):
combined_data = func(self, *args, **kwargs)
# Update DataFrame with new data, default persistence is 10
self.update_data_frame(combined_data, kwargs.get("persistence", 10))
return combined_data
return wrapper
Main run-query
We’ll take in the symbol and the persistence, fetch the market data, extract the details, and auto-update the data from the decorator:
@auto_update_data
def run_query(self, symbol: str, persistence: int = 10) -> dict:
"""
Fetches and processes market data for a specified symbol.
Args:
symbol (str): The market symbol to query data for.
persistence (int): The number of data points to persist in the DataFrame.
Returns:
dict: A dictionary containing combined trade details and market information.
"""
result = self.fetch_market_data(symbol)
trade_details = self.extract_trade_details(result)
market_info = self.extract_market_info(result)
combined_data = {**trade_details, **market_info}
return combined_data
Running the query at an interval:
This will just run the run_query function forever with a time.sleep() function to decide the frequency. This is the function that we’ll have access to in our main.py file. In this we can input the symbol, persistence, quantity to trade, and whether or not to visualize the graph.
def run_query_on_interval(
self, symbol: str, show_graph: bool, persistence: int, quantity: int = 1
):
"""
Periodically executes queries at specified intervals and optionally displays a graph.
Args:
symbol (str): The market symbol to query data for.
interval (int): The interval in seconds between queries.
show_graph (bool): Whether to display the graph after each query.
persistence (int): The number of data points to persist in the DataFrame.
quantity (int): The quantity to be used in trading strategies.
"""
self.quantity = quantity
self.visualizer.set_title(symbol)
while True:
self.run_query(symbol=symbol, persistence=persistence)
if show_graph:
self.visualizer.draw_graph()
time.sleep(self.interval)
And there you have it! We’re done with the major part of our backtester. Now all that’s left is data visualization and strategy implementation. . Stay tuned for the next part!